
// Copyright (c) WanSheng Intelligent Corp. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.


#define LOG_TAG "redis"



#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#include "wa_edge_client_api.h"
#include "wa_redis_api.h"
#include "lib_crypto.h"
#include "string_parser.h"


typedef struct
{
    char *hostname;
    int port;
    WA_REDIS_CALLBACK callback;
    void * user_data;
    redisContext *c;
    char * sub_channels;
} redis_listen_t;


redisContext * wa_redis_connect(int timeout_ms)
{
    redisContext *c = NULL;

    struct timeval timeout = { timeout_ms  / 1000, (timeout_ms%1000) * 1000 }; 
    c = redisConnectWithTimeout("127.0.0.1", 6379, timeout);
    if (c == NULL || c->err) {
        if (c)
        {
            printf("wa_redis_connect: Connection error: %s\n", c->errstr);
            redisFree(c);
            c = NULL;
        }
        else
        {
            printf("wa_redis_connect: Connection error: can't allocate redis context\n");
        }

    }

    return c;
}

char * wa_redis_read_dss(redisContext *c,
        const char * group,
        const char * dss,
        char * buffer,
        int size,
        time_t * tm)
{
    redisReply *reply;

    char key[100] = {0};
    char * ret = NULL;
    redisContext * new_ctx = NULL;

    * tm = 0;

    if (c == NULL)
    {
        new_ctx = redisConnectNonBlock("127.0.0.1", 6379);
        if (new_ctx == NULL || new_ctx->err) {
            if (c)
            {
                printf("wa_redis_connect: redisConnectNonBlock error: %s\n", new_ctx->errstr);
                redisFree(new_ctx);
                new_ctx = NULL;
            }

            return NULL;
        }

        c = new_ctx;
    }

    snprintf(key, sizeof(key), "dss:%s:%s", group, dss);
    reply = (redisReply *)redisCommand(c,"HGET %s v", key);

    if(reply == NULL)
    {
        goto end;
    }

    if(strlen(reply->str) >= (size_t)size)
        goto end;

    strcpy(buffer, reply->str);

    freeReplyObject(reply);


    reply = (redisReply *)redisCommand(c,"HGET %s tm", key);
    if(reply == NULL)
    {
        goto end;
    }
    *tm = atoi(reply->str);
    freeReplyObject(reply);


    ret = buffer;

end:
    if(new_ctx)redisFree(new_ctx);
    return ret;
}


void wa_redis_value_clean(redis_value_t * value)
{
    if(value->value_type == Redis_Value_String && value->sv)
        free(value->sv);
}

void wa_redis_free_values(redis_property_value_t * values, int size)
{
    for(int i=0; i < size; i++)
    {
        if(values[i].value.value_type == Redis_Value_String &&
           values[i].value.sv)
           free(values[i].value.sv);
        if(values[i].property) 
            free(values[i].property);
    }
    free(values);
}

bool wa_redis_parse_value(const char * value_str, redis_value_t * value)
{
    Redis_Value_Type vt = Redis_Value_NA;
    char buf[128] = {0};
    char * value_buf = buf;
    char * alloc_value = NULL;
    int value_buf_len = sizeof(buf);
    
    while(*value_str == ' ')value_str++;
    if(*value_str == 0) return false;

    // preserved buf is not big enough
    if(strlen(value_str) >= sizeof(buf))
    {
        value_buf_len = strlen(value_str) + 1;
        value_buf = alloc_value = (char*) malloc_z(value_buf_len);
        if(value == NULL) return false;
    }

    bool found = false;
    value->time = 0;
    if(find_key_value((char*)value_str, strlen(value_str), "tm", value_buf, value_buf_len, ';'))
    {
        value->time = atoi(value_buf);
        found = true;
    }
    
    if(find_key_value((char*)value_str, strlen(value_str), "t", value_buf, value_buf_len, ';'))
    {
        vt = (Redis_Value_Type) (value_buf[0]);
        found = true;
    }

    if(find_key_value((char*)value_str, strlen(value_str), "v", value_buf, value_buf_len, ';'))
    {
        ;
    }
    else if(!found)
    {
        // value only
        value_buf = (char*)value_str;
    }
    else
    {
        if(alloc_value) free(alloc_value);
        return false;
    }

    int ret = true;
    if(vt == Redis_Value_NA)
    {
        if(is_number((char*)value_str))
            vt = Redis_Value_Float;
        else 
            vt = Redis_Value_String;
    }
    else if(vt == Redis_Value_Float)
    {
       value->fv = atof(value_buf);
    }
    else if(vt == Redis_Value_String)
    {
        value->sv = strdup(value_buf);
    }
    else if(vt == Redis_Value_Bool)
    {
        if(strcasecmp((const char*)value_buf, "true") == 0)
            value->bv = 1;
        else if(strcasecmp((const char*)value_buf, "false") == 0)
            value->bv = 0;
        else
            value->bv = atoi((const char*)value_buf);
    }
    else
    {
        WARNING2("Invalid value type: %s", value_buf);
        ret = false;
    }

    value->value_type = vt;
    if(alloc_value) free(alloc_value);
    return ret;
}


char * wa_redis_value_key(const char * di,
        const char * res, char* buf, int buf_len)
{
    snprintf(buf, buf_len, "v:p:%s:%s", di, res);
    return buf;
}

bool wa_redis_read_property(redisContext *c,
        const char * di,
        const char * res,
        const char * pt,
        redis_value_t * value)
{
    redisReply *reply;
    char key[100] = {0};

    if(pt == NULL || *pt == 0)
        return false;

    reply = (redisReply *)redisCommand(c,"HGET %s %s", 
        wa_redis_value_key(di, res, key, sizeof(key)),
        pt);

    if(reply == NULL)
    {
        return false;
    }

    bool ret = wa_redis_parse_value(reply->str, value);
    freeReplyObject(reply);
    return ret;
}


redis_property_value_t* wa_redis_read_resource(redisContext *c,
        const char * di,
        const char * res,
        int * property_num)
{
    redisReply *reply;
    char key[100] = {0};
    redis_property_value_t * values = NULL;
    bool success = false;
    int size = 0;

    reply = (redisReply *)redisCommand(c,"HGETALL %s", 
        wa_redis_value_key(di, res, key, sizeof(key)));

    if(reply == NULL)
    {
        return NULL;
    }

    if(reply->type == REDIS_REPLY_ARRAY && reply->elements > 1)
    {
        size = reply->elements / 2;
        values = (redis_property_value_t*) malloc_z(size*sizeof(redis_property_value_t));
        if(values == NULL)
        {
            goto end;
        }
        int i = 0;
        while(i < size)
        {
            redisReply *reply_element = *(reply->element + 2*i);
            if(reply_element->type != REDIS_REPLY_STRING) goto end;
            values[i].property = strdup(reply_element->str);
            reply_element = *(reply->element + 2*i + 1);
            if(reply_element->type != REDIS_REPLY_STRING) goto end;
            if(!wa_redis_parse_value(reply_element->str, &values[i].value))
            {
                goto end;
            }
            i++;
        }
        success = true;
    }
end:
    freeReplyObject(reply);
    if(!success && values)
    {
        wa_redis_free_values(values, size);
        *property_num = 0;
        return NULL;
    }
    *property_num = size;
    return values;
}




/*
1) "pmessage"
2) "b.*"
3) "b.b"
4) "aaa"
*/

extern "C" void * thread_redis_event (void * arg)
{


    redis_listen_t * listener = (redis_listen_t*)arg;
    redisContext *c = NULL;
    redisReply *reply;
    bool redis_connected = false;
    bool need_subscribe = true;
    WA_REDIS_CALLBACK fn = (WA_REDIS_CALLBACK) listener->callback;
    char * channel = (char*) "ch:*";
    int conn_errs = 0;

    prctl (PR_SET_NAME, "redis-listener");

    if(listener->sub_channels)
        channel = listener->sub_channels;

    while(1)
    {
        struct timeval timeout = { 1, 500000 }; // 1.5 seconds
        c = redisConnectWithTimeout(listener->hostname, listener->port, timeout);
        if (c == NULL || c->err) {
            if (c)
            {
                if(conn_errs == 0)
                {
                    WALOG("thread_redis_event: Connection error: %s\n", c->errstr);
                }
                conn_errs++;
                redisFree(c);
                c = NULL;
            }
            else
            {
                WARNING("thread_redis_event: Connection error: can't allocate redis context\n");
            }

            sleep(2);
            continue;
        }

        break;
    }

    conn_errs = 0;

    redis_connected = true;

    while(1)
    {

        if(!redis_connected)
        {
            int ret = redisReconnect(c);
            if(REDIS_OK != ret)
            {
                sleep(5);
                continue;
            }

            redis_connected = true;
            need_subscribe = true;
        }


        if(need_subscribe)
        {
            char cmd[256];
            snprintf(cmd, sizeof(cmd), "PSUBSCRIBE %s", channel);


            int ret = redisAppendCommand(c, cmd);

            if(REDIS_OK != ret)
            {
                redis_connected = false;
                sleep(2);
                continue;
            }

            WARNING("thread_redis_event: sub cmd ok, [%s]", cmd);
        }

        /* PSUBSCRIBE format:
        1) "pmessage"
        2) "ch:v*"
        3) "ch:v:046e5f51b67c_mb_dev1:/resource:b"
        4) "10"
        */

        while(redis_connected)
        {
            int ret = redisGetReply(c, (void **)&reply);
            if(REDIS_OK == ret)
            {
                if (reply->type == REDIS_REPLY_ARRAY) {
                    if(reply->elements == 4 &&
                            strcmp(reply->element[0]->str, "pmessage") == 0 &&
                            strcmp(reply->element[1]->str, channel) == 0)
                    {
                        char * type = reply->element[2]->str + 3;
                        char * content = reply->element[3]->str;

                        if(strncmp(type, "reg:", 4) == 0)
                        {
                            char * di = type + 4;
                            fn(Device_Status, di, content, listener->user_data);
                        }
                        else if(strncmp(type, "v:", 2) == 0)
                        {
                            // ch:v:{di}:{res}:{property}
                            char * res = type + 2;
                            fn(Data_Value, res, content, listener->user_data);
                        }
                        else if(strncmp(type, "dss:", 4) == 0)
                        {
                            char * group = type + 4;
                            fn(DSS_Value, group, content, listener->user_data);
                        }
                        else if(strncmp(type, "iagent:", 7) == 0)
                        {
                            char * item = type + 7;
                            fn(Agent_Info, item, content, listener->user_data);
                        }
                        else if(strncmp(type, "ams:cfg:", 8) == 0)
                        {
                            // "ch:ams:cfg:{software name}:{target type}:{target id}"
                            // content: config path
                            char * item = type + 8;
                            fn(AMS_Config_Update, item, content, listener->user_data);
                        }
                        else if(strncmp(type, "ams:sw:", 7) == 0)
                        {
                            // "ch:ams:sw:{category}:{product name}"
                            char * item = type + 7;
                            fn(AMS_Software_Update, item, content, listener->user_data);
                        }
                        else
                        {
                            fn(Other_Sub, type, content, listener->user_data);
                        }
                    }
                }
                else if (reply->type == REDIS_REPLY_STRING)
                {


                }

                freeReplyObject(reply);
            }
            else
            {
                redis_connected = false;
            }
        }
    }

    fn(Quit_Monitor, NULL, NULL, listener->user_data);

    free(listener->hostname);
    free(listener);
    redisFree(c);

    return NULL;
}



extern "C" int wa_redis_start_monitor_with_channel(WA_REDIS_CALLBACK callback, void * user_data, char* channels)
{
    redis_listen_t * arc = (redis_listen_t*) malloc(sizeof(redis_listen_t));
    memset(arc, 0, sizeof(*arc));

    arc->hostname = strdup("127.0.0.1");
    arc->port =  6379;
    arc->callback = callback;
    arc->user_data = user_data;
    arc->sub_channels = channels;
    pthread_t tid;

    if (pthread_create (&tid, NULL, thread_redis_event, arc))
    {

        return tid;
    }

    return -1;

}


extern "C" int wa_redis_start_monitor(WA_REDIS_CALLBACK callback, void * user_data)
{
    return wa_redis_start_monitor_with_channel(callback, user_data, (char*)"ch:*");
}

int stop_redis_monitor(int id)
{
    // Todo: implement it
    return 0;
}

char * wa_redis_subject_decompose(char * subject, char **res, char ** pt)
{
    char * di = subject;
    char * p = strchr(di, ':');
    if(p)
    {
        *p = 0;
        *res = p + 1;
    }
    else
    {
        return NULL;
    }

    p = strchr(*res, ':');
    if(p)
    {
        *p = 0;
        *pt = p + 1;
    }

    return di;
}


char* wa_redis_subject_AMS_CFG(char* subject, char** target_type, char** target_id)
{
    return wa_redis_subject_decompose(subject, target_type, target_id);
}

